后端学习

您所在的位置:网站首页 java kafka 消费 后端学习

后端学习

2023-04-01 03:12| 来源: 网络整理| 查看: 265

前言

本文主要简单介绍java如何使用消费者,以及介绍消费者的自动提交offset,手动提交offset,自定义提交offset。最后

一、自动提交offset @Test public void testKafkaConsumer(){ // 创建配置信息 Properties properties = new Properties(); // 连接集群 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092,localhost:9093"); // 设置消费组 properties.put(ConsumerConfig.GROUP_ID_CONFIG,"testConsumers1"); // 开启自动提交offset,并设置自动提交等待时间 properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true); properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000"); // 设置没有offset信息时默认消费消息规则 properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest"); // 设置key,value的反序列化器 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer"); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer"); // 创建消费者 try (KafkaConsumer consumer = new KafkaConsumer(properties)) { // 订阅主题 List topics = Collections.singletonList("first"); consumer.subscribe(topics); System.out.println("开始消费"); while (true) { ConsumerRecords records = consumer.poll(Duration.ofMillis(1000)); for(ConsumerRecord record:records){ System.out.printf("topic:%s--partition:%s,key:%s--value:%s--offset:%d\n",record.topic(),record.partition(),record.key(),record.value(),record.offset()); } } }catch (Exception e){ System.out.println(e.getMessage()); } }二、手动提交offset

自动提交offset是由我们设置的自动提交延迟来控制,提交时间十分固定不够灵活,使开发人员对应offset提交时间难以把握。若时间设置过快,消费者拿到消息还未处理完成数据offset就提交了,此时消费者宕机重启会不会从宕机的位置消费,而是从offset位置开始消费,导致一部分数据没有被处理,数据丢失。若时间设置过缓,消费者拿到消息处理完成但是offset还没有提交,会导致消费被重复处理。使用手动提交能使offset的提交更为灵活,更不容易出现问题。

1.同步提交

同步提交有重试机制,需要等待提交成功后才会继续消费。

public void testKafkaConsumer(){ // 创建配置信息 Properties properties = new Properties(); // 连接集群 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092,localhost:9093"); // 设置消费组 properties.put(ConsumerConfig.GROUP_ID_CONFIG,"testConsumers1"); // 关闭自动提交 properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false); // 设置没有offset信息时默认消费消息规则 properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest"); // 设置key,value的反序列化器 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer"); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer"); // 创建消费者 try (KafkaConsumer consumer = new KafkaConsumer(properties)) { // 订阅主题 List topics = Collections.singletonList("first"); consumer.subscribe(topics); System.out.println("开始消费"); while (true) { ConsumerRecords records = consumer.poll(Duration.ofMillis(1000)); for(ConsumerRecord record:records){ System.out.printf("topic:%s--partition:%s,key:%s--value:%s--offset:%d\n",record.topic(),record.partition(),record.key(),record.value(),record.offset()); } // 同步提交,当前线程会阻塞直到offset提交成功 consumer.commitSync(); } }catch (Exception e){ System.out.println(e.getMessage()); } }2.异步提交

同步提交虽然可靠但是效率比较低。将同步提交替换成异步提交即可。

// 异步提交,不会阻塞线程 consumer.commitAsync();三、自定义提交offset1.概述

无论是同步提交还是异步提交,都有可能造成数据的漏消费或者重复消费。先提交offset后消费,可能造成数据的漏消费,先消费后提交可能造成数据的重复消费。

若需要高精度数据不允许重复消费也不允许漏消费,offset提交和消息处理要么全部失败,要么全部成功,就需要将offset和消息的处理进行事务绑定。这时就需要我们自己维护offset,将其和业务进行事务绑定,就需要使用到自定义提交offset。例如:

# 使用mysql存储数据,在消费的消息处理进入mysql同时将offset也写入mysql,并将上述步骤使用mysql事务绑定。 # 当消费者宕机重启后使用mysql中的offset作为偏移量来消费数据,避免: ## 业务处理完成,offset没有提交宕机造成的消息重复处理; ## offset提交了,业务没有处理完成宕机造成的消息丢失。

实现自定义提交offset需要使用到ConsumerRebalanceListener用于监听消费者的再平衡,因为offset的维护是相当繁琐的,需要考虑消费者的再平衡。当消费者发生再平衡后每个消费者负责的分区都会发生变化。例如:

# 消费者A负责主题1分区1,消费者B负责主题1分区2 # 现在消费者B正在处理消息宕机了发生再平衡,消费者A将负责全部分区。 # 此时就需要读取数据库中的offset并将消费者A的主题1分区2设置为该offset。 # 因为消费者B是正在处理消息中宕机kafka中存储的offset可能不准确要么漏消费,要么重复消费。

当有新的消费者加入消费者或者消费者脱离消费者,消费者订阅分区发生变化都会触发消费者的再平衡(Rebalance)。

使用自定义提交offset,与手动提交基本一致,仅需要修改消费者的订阅加上ConsumerRebalanceListener,以及每次处理完消息后将offset写入数据库。本质上就是不在使用kafka来维护offset,而是自己维护offset,利用其他数据库事务能够保证offset存储与业务数据存储保持一致性。

2.设想

注:本方案仅为设想,未经过测试,可能不正确,仅供参考。

/** * 业务数据通过mysql存储,offset也使用mysql存储 */ // 创建消费者 KafkaConsumer consumer = new KafkaConsumer(properties) // 订阅主题 consumer.subscribe(Arrays.asList("first"), new ConsumerRebalanceListener() { /** * 该方法会在消费者的再平衡前调用 * @param collection */ @Override public void onPartitionsRevoked(Collection collection) { // 开始触发再平衡,此时该消费者可能正在处理消息,但再平衡后负责分区将发生变化 // 可能会出现这种情况,A正在消费分区1,还没有来得及提交offset,分区1被分配给B, // B获取还没有被A修改的offset,然后A处理完成提交offset事务完成,B重复消费。 // 解决方案: // 设置状态为再平衡就绪,正在处理消息的线程判断为该状态时,中断所有事务,回滚数据。 } /** * 该方法会在消费者的再平衡后调用 * @param collection */ @Override public void onPartitionsAssigned(Collection collection) { // 获取mysql中的offsets信息,此处不做演示 Map offsets = new HashMap(); // 重置消费者各负责分区的offset for(TopicParition parition:collection){ consumer.seek(parition,offsets.get("主题+分区,如:parition.topic()+parition.parition()"); } } // 以下为消息处理逻辑,此处只是简单描述,仅供参考。 // 消费者获取消费,进行数据处理,以及获取各主题分区的offset // 开启事务 // 将处理好的数据存储mysql // 将offset存入mysql // 上述过程中需要判断消费者是否进入再平衡就绪状态,进入中断事务回滚数据。 });四、消费者组实例

以下为一个简单的多线程消费者组实例,不一定实用,可供参考

consume.java // 接口,消费逻辑

import org.apache.kafka.clients.consumer.ConsumerRecords; public interface Consume { /** * 消费逻辑 */ public void consume(ConsumerRecords records); }

ConsumerGroup.java //消费者组实例

import com.demo.util.Consume; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Properties; public class ConsumerGroup { private static final Properties properties; // 以下配置信息可以写入配置文件中,以便修改 static { properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092,localhost:9093"); properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true); properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer"); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer"); } private ConsumerGroup(){} private List start(Integer num, Consume consume,List topic,Long time,String groupId){ Properties properties0 = (Properties) properties.clone(); properties0.put(ConsumerConfig.GROUP_ID_CONFIG,groupId); for(int i=0;i{ try(KafkaConsumer consumer = new KafkaConsumer(properties)){ consumer.subscribe(topic); while(true){ ConsumerRecords records = consumer.poll(Duration.ofMillis(time)); consume.consume(records); } }catch (Exception e){ System.out.println(e.getMessage()); } }); thread.setName(String.format("consumer%d",i)); list.add(thread); thread.start(); } return list; } }附录其他学习内容

后端学习内容



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3